网络层设计与实现
undefined网络层设计与实现
undefinedNode
一个网络节点(Node)命名为Network。
Node用Network来定义和实现,特指P2P网络节点,更体现Node的本质。
// Network 节点的数据结构type Network struct {Host host.Host//主机GeneralChannel *Channel//通用节点MiningChannel *Channel//挖矿节点FullNodesChannel *Channel//全节点Blockchain *blockchain.BlockchainBlocks chan *blockchain.Block//Block类型的通道Transactions chan *blockchain.Transaction//Transaction类型的通道Miner bool}
undefinedChannel
Channel为通信通道,每个host有三个通信通道,但根据其节点的类别,一般一个节点只用到其中一个通信通道。
// Channel 的数据结构type Channel struct {ctx context.Contextpub *pubsub.PubSub//发布者topic *pubsub.Topicsub *pubsub.Subscription//订阅者channelName string//构成Topic名称字符串的组成部分(TopicName="channel:" + channelName)self peer.IDContent chan *ChannelContent//ChannelContent类型的通道}
GeneralChannel为通用节点,负责列举所有连接到主机(host)的所有peer,这也是所有连接到host的peer,处理除了tx之外的所有命令消息。
FullNodesChannel为全节点,处理与交易相关的tx及gettxfrompool命令,即将新交易放到内存池,以及每秒不断将交易从交易池中取出(这里我们每秒只取出一条交易,可以优化为每次取出多条交易)给挖矿节点进行挖矿。
MiningChannel为挖矿节点,处理来自交易池的inv命令及来自交易池的tx命令。
undefinedHost
P2P的host package定义了Host这一interface。
// 为本主机(host)创建一对新的 RSA 密钥prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)if err != nil {panic(err)}transports := libp2p.ChainOptions(libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议libp2p.Transport(ws.New),//支持websorcket传输协议)muxers := libp2p.ChainOptions(libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建)if len(listenPort) == 0 {listenPort = "0"}listenAddrs := libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输)// Host是参与p2p网络的对象,它实现协议或提供服务。// 它像服务器一样处理请求,像客户端一样发出请求。// 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。// 1、创建host// 重要:创建主机host//-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";//-如果没有提供transport的选项,节点使用TCP和websorcket传输协议//-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置//-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置)//-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity//-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化host, err := libp2p.New(ctx,transports,listenAddrs,muxers,libp2p.Identity(prvKey),)
上述代码中第一步创建Host:
host, err := libp2p.New(...)
我们追溯New函数,它来自于libp2p.go,最终调用的是:
func NewWithoutDefaults(ctx context.Context, opts ...Option) (host.Host, error) {varcfg Configif err := cfg.Apply(opts...); err != nil {returnnil, err}return cfg.NewNode(ctx)}
我们继续追溯cfg.NewNode(ctx),在P2Plib的config.go,关键代码如下:
func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {swrm, err := cfg.makeSwarm(ctx)if err != nil {returnnil, err}h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ConnManager: cfg.ConnManager,AddrsFactory: cfg.AddrsFactory,NATManager: cfg.NATManager,EnablePing: !cfg.DisablePing,UserAgent: cfg.UserAgent,})...h.Start()if router != nil {return outed.Wrap(h, router), nil}return h, nil}
security transport,默认的值为:
var DefaultSecurity = libp2p.ChainOptions(Security(noise.ID, noise.New),Security(tls.ID, tls.New),)
上述代码的第一个关键是:
swrm, err := cfg.makeSwarm(ctx)
我们追溯进去,看看cfg.makeSwarm(ctx):
func (cfg *Config) makeSwarm(ctx context.Context) (*swarm.Swarm, error) {//从config保存的公钥得到pidpid, err := peer.IDFromPublicKey(cfg.PeerKey.GetPublic())...swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater)return swrm, nil
我们继续追溯swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater):
func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm {s := &Swarm{local: local,peers: peers,bwc: bwc,}...return s
可见,peer.ID被赋值到Swarm对象的local变量。
我们回到函数:
func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {swrm, err := cfg.makeSwarm(ctx)if err != nil {returnnil, err}h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ConnManager: cfg.ConnManager,AddrsFactory: cfg.AddrsFactory,NATManager: cfg.NATManager,EnablePing: !cfg.DisablePing,UserAgent: cfg.UserAgent,})...h.Start()if router != nil {return outed.Wrap(h, router), nil}return h, nil}
前面已经讨论完了swrm, err := cfg.makeSwarm(ctx),我们继续往下看,swrm成为创建Host的一个参数:
h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ConnManager: cfg.ConnManager,AddrsFactory: cfg.AddrsFactory,NATManager: cfg.NATManager,EnablePing: !cfg.DisablePing,UserAgent: cfg.UserAgent,})
bhost是一个package:
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
我们查看上面的NewHost,进入到basichost package(basic_host.go):
定义了basichost:
type BasicHost struct
然后BasicHost实现了Host的所有接口方法,其中NewHost接口实现如下:
func NewHost(ctx context.Context, n network.Network, opts \*HostOpts) (\*BasicHost, error) {hostCtx, cancel := context.WithCancel(ctx)h := &BasicHost{network: n,mux: msmux.NewMultistreamMuxer(),negtimeout: DefaultNegotiationTimeout,AddrsFactory: DefaultAddrsFactory,maResolver: madns.DefaultResolver,eventbus: eventbus.NewBus(),addrChangeChan: make(chanstruct{}, 1),ctx: hostCtx,ctxCancel: cancel,disableSignedPeerRecord: opts.DisableSignedPeerRecord,}...return h, nil
swrm作为参数传给了n network.Network。而实现的接口:
func (h *BasicHost) ID() peer.ID {return h.Network().LocalPeer()}
h.Network()返回Swarm对象(Swarm是一个struct,实现了接口network.Network(Network是一个interface))
func (h *BasicHost) Network() network.Network {return h.network}
我们看看Swarm的函数LocalPeer(),正好返回的是local(即peer的ID):
func (s *Swarm) LocalPeer() peer.ID {return s.local}
undefined小结
1、主机实际上是BasicHost struct,它实现了Host interface,peer.ID在创建host时候已经在Host中得到了(host.ID()得到的即是peer.ID)。
2、同时Swarm struct实现了libp2p的network.Network interface。
3、BasicHost和Swarm均由p2plib提供。
undefinedPeer
Peer为对等端,是host的第三方视觉的概念。
Peer以ID为唯一标识,peer.ID是通过哈希peer的公钥而派生,并编码其哈希输出为multihash的结果。
peer.ID是往后不同节点之间进行通信的重要参数,它代表一个Host,或者说,我们可以通过peer.ID获得一个具体的Host对象。如发送虚拟币:
func (net *Network) SendTx(peerId string, transaction *blockchain.Transaction) {memoryPool.Add(*transaction)tnx := Tx{net.Host.ID().Pretty(), transaction.Serializer()}payload := GobEncode(tnx)request := append(CmdToBytes("tx"), payload...)// 给全节点(FullNodes)第通信通道发布此消息,全节点将进行处理net.FullNodesChannel.Publish("接收到 Send transaction 命令", request, peerId)}
如同Host一样,peer package也是在libp2p库中定义,所在的文件是peer.go,不同的是,在peer.go中并没有定义一个peer的struct,而是直接在peer package中定义ID:
type ID string
但显然ID是一个mutihash的值,如需要对外呈现需要使用base58编码后得到人可以识别的字符串:
func (id ID) String() string {return id.Pretty()}
Pretty方法如下:
func (id ID) Pretty() string {returnIDB58Encode(id)}
undefined网络通信流程
一切从startNode开始。
main.go:
cli.StartNode(listenPort, minerAddress, miner, fullNode, func(net *p2p.Network) {//最后一个参数是回调函数,获得net实例if rpc {cli.P2p = net//启动节点后设置cli的P2P实例,net为启动节点函数的回调函数参数被回调后返回的Network实例go jsonrpc.StartServer(cli, rpc, rpcPort, rpcAddr)}})
其中cli的结构:
type CommandLinestruct {Blockchain *blockchain.BlockchainP2p *p2p.NetworkCloseDbAlways bool//每次命令执行完毕是否关闭数据库}
其中istenPort, minerAddress, miner, fullNode等参数的值来自于命令startnode执行时获得的命令行参数。
cli.StartNode实现:
// StartNode 启动节点,其中fn为回调函数,p2p.StartNode调用过程中调用fn,设置p2p.Network实例func (cli *CommandLine) StartNode(listenPort, minerAddress string, miner, fullNode bool, fn func(*p2p.Network)) {if miner {log.Infof("作为矿工正在启动节点: %s\\n", listenPort)iflen(minerAddress) > 0 {if wallet.ValidateAddress(minerAddress) {log.Info("正在挖矿,接收奖励的地址是:", minerAddress)} else {log.Fatal("请提供一个合法的矿工地址")}}} else {log.Infof("在: %s\\n端口上启动节点", listenPort)}chain := cli.Blockchain.ContinueBlockchain()p2p.StartNode(chain, listenPort, minerAddress, miner, fullNode, fn)}
在获得了blockchain实例后,调用p2p package的StartNode函数:
// StartNode 启动一个节点func StartNode(chain *blockchain.Blockchain, listenPort, minerAddress string, miner, fullNode bool, callback func(*Network)) {var r io.Readerr = rand.Reader//没有指定seed,使用随机种子MinerAddress = minerAddressctx, cancel := context.WithCancel(context.Background())defercancel()defer chain.Database.Close()//函数运行结束,关闭区块链数据库go appUtils.CloseDB(chain)//启动协程,遇到程序强行终止信号时关闭数据库,退出程序// 为本主机(host)创建一对新的 RSA 密钥prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)if err != nil {panic(err)}transports := libp2p.ChainOptions(libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议libp2p.Transport(ws.New),//支持websorcket传输协议)muxers := libp2p.ChainOptions(libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建)if len(listenPort) == 0 {listenPort = "0"}listenAddrs := libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输)// Host是参与p2p网络的对象,它实现协议或提供服务。// 它像服务器一样处理请求,像客户端一样发出请求。// 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。// 1、创建host// 重要:创建主机host//-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";//-如果没有提供transport的选项,节点使用TCP和websorcket传输协议//-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置//-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置)//-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity//-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化host, err := libp2p.New(ctx,transports,listenAddrs,muxers,libp2p.Identity(prvKey),)if err != nil {panic(err)}for _, addr := range host.Addrs() {fmt.Println("正在监听在", addr)}log.Info("主机已创建: ", host.ID())// 2、使用GossipSub路由,创建一个新的基于Gossip 协议的 PubSub 服务系统// 任何一个主机节点,都是一个订阅发布服务系统// 这是整个区块链网络运行的关键所在pub, err := pubsub.NewGossipSub(ctx, host)if err != nil {panic(err)}// 3、构建三个通信通道,通信通道使用发布-订阅系统,在不同节点之间传递信息// 之所以需要三个通道,是因为未来规划不同节点拥有不同的功能,不同功能的节点完成不同类型的任务。// 三个通道的消息独立,只有订阅了该通道消息的节点,才能收到该通道的消息,然后进行处理,以完成相应的任务。// 任何一个节点,均创建了三个通道实例,这意味着人一个节点都可以根据需要,选择任意一个通道发送消息// 在订阅上,一个具体的节点, GeneralChannel 订阅将消息,如果是采矿节点(miner==true),miningChannel 会接收到消息,// 如果是全节点(fullNode==true),fullNodesChannel会接受到消息//GeneralChannel 通道订阅消息generalChannel, _ := JoinChannel(ctx, pub, host.ID(), GeneralChannel, true)subscribe := falseif miner {subscribe = true}//如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息miningChannel, _ := JoinChannel(ctx, pub, host.ID(), MiningChannel, subscribe)subscribe = falseif fullNode {subscribe = true}//如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息fullNodesChannel, _ := JoinChannel(ctx, pub, host.ID(), FullNodesChannel, subscribe)// 3、为各通信通道建立命令行界面对象ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel)// 4、建立对等端(peer)发现机制(discovery),使得本节点可以被网络上的其它节点发现//同时将主机(host)连接到所有已经发现的对等端(peer)err = SetupDiscovery(ctx, host)if err != nil {panic(err)}network := &Network{Host: host,GeneralChannel: generalChannel,MiningChannel: miningChannel,FullNodesChannel: fullNodesChannel,Blockchain: chain,Blocks: make(chan *blockchain.Block, 200),Transactions: make(chan *blockchain.Transaction, 200),Miner: miner,}// 5、回调,将节点(network)实例传回callback(network)// 6、向全网请求区块信息,以补全本地区块链// 每一个节点均有区块链的一个完整副本err = RequestBlocks(network)// 7、启用协程,处理节点事件goHandleEvents(network)// 8、如果是矿工节点,启用协程,不断发送ping命令给全节点if miner {// 矿工事件循环,以不断地发送一个ping给全节点,目的是得到新的交易,为它挖矿,并添加到区块链go network.MinersEventLoop()}if err != nil {panic(err)}// 9、运行UI界面,将在Run函数体中启动协程,循环接收并处理全网节点publish的消息iferr = ui.Run(network); err != nil {log.Error("运行文字UI发生错误: %s", err)}}
